/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lvyh.lightframe.core.consumer.cluster.impl;

import com.lvyh.lightframe.core.provider.ProviderHelper;
import com.lvyh.lightframe.core.provider.ProviderInfo;
import com.lvyh.lightframe.core.consumer.cluster.AbstractCluster;
import com.lvyh.lightframe.core.consumer.route.LoadBalance;
import com.lvyh.lightframe.core.consumer.transport.AbstractClient;
import com.lvyh.lightframe.common.ext.ExtensionLoaderFactory;
import com.lvyh.lightframe.common.ext.Spi;
import com.lvyh.lightframe.core.invoke.request.RpcRequest;
import com.lvyh.lightframe.core.invoke.response.RpcResponse;
import com.lvyh.lightframe.common.exception.RpcRuntimeException;
import com.lvyh.lightframe.common.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
 * Automatic recovery of failure, background record of failure request,
 * regular resend, usually used for message notification operation
 */
@Spi("failback")
public class FailbackCluster extends AbstractCluster {
    private Logger logger = LoggerFactory.getLogger(FailbackCluster.class);
    private static final long RETRY_FAILED_PERIOD = 5 * 1000;

    public FailbackCluster(LoadBalance loadBalance) {
        super(loadBalance);
    }

    private final ScheduledExecutorService scheduledExecutorService = ThreadPoolUtils.newScheduledThreadPool("failBack-cluster-timer");
    private final ConcurrentMap<RpcRequest, AbstractCluster> failed = new ConcurrentHashMap<>();
    private volatile ScheduledFuture<?> retryFuture;

    @Override
    public RpcResponse doInvoke(RpcRequest request) throws RpcRuntimeException {
        ProviderInfo providerInfo = select(request, loadBalance);
        request.setServerAddress(ProviderHelper.convert(providerInfo));
        try {
            AbstractClient client = ExtensionLoaderFactory.getExtensionLoader(AbstractClient.class).getExtension(request.getTransport());
            RpcResponse response = doInvoke(client, request);
            return response;
        } catch (Exception e) {
            logger.error("FailBack to invoke method {}, wait for retry in background. Ignored exception: {}", request.getMethodName(), e.getMessage(), e);
            addFailed(request, this);
        }
        return new RpcResponse();
    }

    /**
     * Add to failure queue
     */
    private void addFailed(RpcRequest rpcRequest, AbstractCluster cluster) {
        if (retryFuture == null) {
            synchronized (this) {
                if (retryFuture == null) {
                    retryFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> {
                        // Collect statistics
                        try {
                            retryFailed();
                        } catch (Exception t) {
                            // Defensive fault tolerance
                            logger.error("Unexpected error occur at collect statistic", t);
                        }
                    }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
                }
            }
        }
        failed.put(rpcRequest, cluster);
    }

    /**
     * Failed to try again
     */
    private void retryFailed() {
        if (failed.size() == 0) {
            return;
        }
        for (Map.Entry<RpcRequest, AbstractCluster> entry : new HashMap<RpcRequest, AbstractCluster>(failed).entrySet()) {
            RpcRequest rpcRequest = entry.getKey();
            AbstractCluster cluster = entry.getValue();
            try {
                cluster.invoke(rpcRequest);
                failed.remove(rpcRequest);
            } catch (Exception e) {
                logger.error("Failed retry to invoke method {}, waiting again.", rpcRequest.getMethodName(), e);
            }
        }
    }
}
